package com.xiaofan.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.{DataTypes, FieldExpression}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object FileOutputTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val filePath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"

    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    val sensorTable: Table = tableEnv.from("inputTable")

    // 简单转换
    val resultTable: Table = sensorTable
      .select($"id", $"temp")
      .filter($"id" === "sensor_1")

    // 聚合转换
    val aggTable: Table = sensorTable
      .groupBy($"id")
      .select($"id", $"id".count as "count")

    // tableEnv.toAppendStream[(String, Double)](resultTable).print("append")
     tableEnv.toRetractStream[(String, Long)](aggTable).print("agg")

    // 输出到文件
    val outputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\output1.txt"

    tableEnv.connect(new FileSystem().path(outputPath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        //        .field("temp", DataTypes.DOUBLE())
        .field("cnt", DataTypes.BIGINT())
      )
      .createTemporaryTable("outputTable")
    // 文件系统只支持append 流
    aggTable.executeInsert("outputTable")

    env.execute("file output test")


  }
}
